Skip to Content

06. 事件流、事件流 UI 与执行可视化

本章高频面试题

  1. 什么是事件流?为什么 Agent 系统需要事件流?
  2. 事件流和普通 request-response 的区别是什么?
  3. 事件流 UI 和日志面板有什么区别?
  4. 如何设计事件 schema,才能同时服务前端、调试和审计?
  5. SSE、WebSocket、HTTP streaming、HTTP/2 长连接分别适合什么场景?
  6. 如何保证事件顺序、去重和幂等?
  7. 如何让事件流可恢复、可回放、可重连?resumable stream 怎么实现?
  8. 并行步骤和多工具调用时,UI 如何展示最合理?
  9. 什么是 AG-UI?它定义了哪些核心事件类型?
  10. Generative UI / Tool call UI 是什么?
  11. Vercel AI SDK、CopilotKit、LangGraph useStream 应该怎么选?

1. 什么是事件流

事件流可以理解成:

系统在执行任务的过程中,不是一次性返回最终结果,而是持续不断地把过程中的状态变化和中间结果发送出来。

在 Agent 场景里,常见事件包括:

  • run started / step started
  • tool started / finished / failed
  • message token streamed
  • reasoning token streamed(如果模型暴露了 thinking)
  • state delta(状态增量更新)
  • waiting for human approval
  • final result

2. 为什么 Agent 特别需要事件流

传统接口大多是:请求进来 → 处理 → 返回结果。

Agent 的特点:执行时间更长、中间步骤更多、状态变化更丰富、可能要调用多个工具、可能需要人工介入。

如果没有事件流,用户只能看到一个”加载中”。这带来两个问题:

  1. 用户体验很差
  2. 工程排障几乎没有抓手

所以事件流对 Agent 来说不是锦上添花,而是核心基础设施。

3. 什么是事件流 UI

事件流 UI 不是简单把后端日志显示出来。它真正要解决的是:

  1. 用户知道系统现在在做什么
  2. 用户知道为什么它还没结束
  3. 用户知道失败发生在哪一步
  4. 用户能在合适的时机介入(批准、中断、补充信息)

换句话说,事件流 UI 的核心不是”完整呈现所有底层事件”,而是:

把复杂的 Agent 执行过程翻译成用户能够理解的状态和反馈。

4. 事件 schema 应该怎么设计

一个成熟的事件 schema 至少要满足:

  1. 可排序
  2. 可关联(父子关系)
  3. 可重放
  4. 可审计
  5. 可聚合(多客户端看同一个 run)

常见设计:

type AgentEvent = { eventId: string; // 全局唯一,幂等键 runId: string; // 任务唯一标识 threadId: string; // 会话标识,跨 run parentId?: string; // 父事件 id,表达父子关系(tool call 属于哪个 step) sequence: number; // 同一 run 内单调递增 timestamp: string; // ISO 8601 type: | "run_started" | "step_started" | "step_completed" | "tool_started" | "tool_completed" | "tool_failed" | "token" | "reasoning_token" | "state_delta" | "interrupt" | "run_completed" | "run_failed"; source: string; // 哪个 agent / node status?: "pending" | "running" | "completed" | "failed"; payload: Record<string, unknown>; // 可选的追踪字段 traceId?: string; // 对接 OTel spanId?: string; };

几个关键字段:

  • runId 串整个任务
  • threadId 跨 run(一个会话可能包含多次 run)
  • sequence 排序
  • parentId 表达父子关系
  • type 给 UI 和监控做语义分流
  • traceId / spanId 关联到 OTel,打通事件流和 observability

5. 如何处理顺序、去重和幂等

事件流一旦跨进程、跨网络,就不能假设”顺序天然正确且只来一次”。

5.1 生产端约束

  • 每条事件有唯一 eventId
  • 同一 run 内 sequence 单调递增(自增计数器或 Redis INCR)
  • 事件必须先持久化再推送(outbox pattern),避免推送成功但丢失
  • 高频 token 事件可以批量发送,但要保持有序

5.2 消费端约束

  • UI 按逻辑顺序展示,不按到达顺序盲目展示
  • 基于 eventId 去重(5 分钟 LRU cache 足够)
  • UI 更新逻辑必须幂等:step_completed 重复到达两次也不应该把 UI 搞坏;token 延迟到达不应该导致步骤状态回滚
  • 对乱序 token 有两种策略:(a) 按 sequence 排序后渲染,(b) append-only 渲染 + 最后一次 state_snapshot 兜底

5.3 Outbox Pattern

强一致性场景推荐 outbox pattern:

  1. 业务事务里同时写业务表和 events_outbox
  2. 单独的 relay 进程读 outbox 推送到流
  3. 推送成功后标记为 published

好处:事件不会在业务成功后丢失,也不会业务失败但事件已发出。

6. 传输协议:SSE vs WebSocket vs HTTP streaming

6.1 SSE(Server-Sent Events)

基于 HTTP/1.1 chunked 或 HTTP/2,单向服务端推流。

优点:

  • 实现简单
  • 自动重连(浏览器原生)
  • Last-Event-ID 头天然支持断点续传
  • 对文本流、状态流非常友好
  • 大多数反向代理/CDN 默认支持

适合:聊天流式回答、单向事件通知、常规 Agent 执行展示。这是当前 Agent UI 最主流的选择

6.2 WebSocket

优点:双向通信、实时控制能力强、适合高交互场景。 适合:用户实时中断任务、多人协作、高频双向控制、需要持续 ping/pong 的低延迟场景。 缺点:复杂、反向代理配置麻烦、serverless 环境不友好。

6.3 HTTP streaming + fetch()

Vercel AI SDK、Anthropic SDK 都在用这个模式:用 fetch() 读 ReadableStream,手动解析 SSE 或自定义 chunk 格式。

优点:serverless 友好(Edge runtime、Cloudflare Workers 都支持)、和 React Suspense/use() 集成自然。 这是目前 React 生态最推崇的方案。

6.4 轮询

适合:后台任务页、实时性要求不高、MVP 阶段。

6.5 怎么选

  • 默认 SSE(或等价的 HTTP streaming),除非有明确双向需求
  • 需要双向控制(用户随时打断、多人协作)→ WebSocket
  • 需要异步长任务查询(分钟到小时级)→ 轮询或 webhook,不要用长连接

7. Resumable Stream:事件流的可恢复设计

很多系统只做”直播流”,却没做”恢复能力”。但用户会:

  • 刷新页面
  • 切后台
  • 断网
  • 稍后回来继续看

更稳的设计必须同时具备实时层恢复层

7.1 SSE 的天然恢复能力

SSE 规范里有 id: 字段和客户端重连时的 Last-Event-ID 请求头:

event: token id: 123 data: {"token": "Hello"} event: token id: 124 data: {"token": " world"}

客户端断开重连时,浏览器自动带上 Last-Event-ID: 124,服务端从 125 开始补发。这是原生支持的,不需要自己造。

7.2 Redis Streams 做事件存储

生产级实现通常这样:

  1. Agent runtime 把事件 append 到 Redis Stream(每个 run 一个 stream)
  2. SSE endpoint 从 Redis Stream 读取并推送(支持 XREAD BLOCK 阻塞读)
  3. 客户端重连时带 Last-Event-ID,服务端从对应 offset 继续
  4. 事件同时异步落库(Postgres)做长期存档

Vercel 2024 年开源的 Resumable Streams library 就是这个模式的封装。

7.3 架构要点

  • 事件先持久化后推送,避免”推送成功但断连后丢失”
  • Stream TTL 合理设置(活跃 run 保留,完成 run 24h 后归档到冷存储)
  • 客户端持有 (runId, lastEventId),任何时刻都能恢复
  • 前端优先”回放历史 → 接实时流”,避免闪烁

8. 并行执行如何展示

Agent 一旦支持并行检索、多工具并发、子任务并发,UI 就不能再只做一条线性时间轴。

常见做法:

  1. 顶层展示任务总进度
  2. 子区域展示并行步骤卡片
  3. 每个卡片展示状态、耗时、结果摘要
  4. 失败卡片可展开查看错误和重试信息
  5. Sub-agent 用折叠容器,默认折叠只显示汇报结果,工程师可展开看过程

设计原则:

用户默认看业务阶段,工程师展开看原始执行细节。

9. AG-UI:Agent ↔ UI 的事件语义标准

9.1 AG-UI 是什么

AG-UI 是一个 Agent 和前端 UI 交互的事件协议标准(ag-ui.com)。它的价值不在于”比 WebSocket 更高级”,而在于:

它标准化了 Agent 和 UI 之间应该交换哪些事件和语义。

可以这样区分:

  • WebSocket / SSE:数据怎么传
  • AG-UI:传的事件语义是什么

9.2 AG-UI 定义的核心事件类型

根据当前官方文档,AG-UI 的事件分为几大类:

生命周期事件

  • RunStarted / RunFinished / RunError
  • StepStarted / StepFinished

文本消息事件

  • TextMessageStart / TextMessageContent / TextMessageEnd
  • TextMessageChunk(便利事件,组合 start+content+end)

工具调用事件

  • ToolCallStart / ToolCallArgs(流式参数)/ ToolCallEnd
  • ToolCallResult(工具执行结果)
  • ToolCallChunk(便利事件)

状态管理事件

  • StateSnapshot(完整状态快照)
  • StateDelta(JSON Patch 格式的增量更新)
  • MessagesSnapshot(会话快照)

活动事件

  • ActivitySnapshot / ActivityDelta

推理事件

  • ReasoningStart / ReasoningMessageStart/Content/End / ReasoningEnd
  • ReasoningEncryptedValue(加密的 CoT,用于 Anthropic extended thinking)

特殊事件

  • Raw(外部系统事件的容器)
  • Custom(应用自定义扩展)

对面试/工程来说,理解三件事就够了:

  1. AG-UI 区分”消息流 / 工具调用 / 状态 / 推理”四大类事件
  2. 状态管理用 snapshot + delta(JSON Patch)模式,保证可恢复
  3. 推理事件单独分类,方便 UI 折叠/隐藏 CoT

10. Generative UI 和 Tool Call UI

这是 2024-2025 才成熟的模式:让 Agent 不仅返回文本,还返回可以渲染成 UI 组件的结构化结果

10.1 Vercel AI SDK 的 Generative UI

streamUI / createStreamableUI:tool call 的结果直接是一个 React 组件(Server Component),前端自动流式渲染。例如”查机票”工具返回一张卡片组件,而不是 JSON。

10.2 CopilotKit 的 Tool call UI

每个 tool 可以注册对应的 renderer,agent 调 tool 时前端自动渲染对应组件(加载态、结果态、错误态)。

10.3 工程取舍

  • Generative UI 让用户体验更丝滑,但把前端组件和 agent 耦合了
  • 严格分层的项目(前端独立演进)通常仍用 JSON 结果 + 前端自行渲染
  • Multi-frontend 场景(web + mobile + 桌面)不适合 Generative UI

11. 前端选型:useStream、Vercel AI SDK、CopilotKit

当前(2026-04)主流方案:

方案适用场景核心能力
LangGraph useStream后端是 LangGraph原生接 graph stream、自动处理 interrupt、thread 管理
Vercel AI SDK后端任意,前端 React/Next.js统一的 useChat / useCompletion hooks、generative UI、多 provider
CopilotKit需要”把 Copilot 嵌入现有 app”sidebar/textbox/autocomplete 组件、action 系统
assistant-ui更灵活的聊天 UI 组件库模型无关、Compose 风格、适合需要自定义交互
自研特殊事件语义、非标准协议最大灵活度

选型原则:

  • 后端是 LangGraph 优先用 useStream,能省大量集成工作
  • 需要跨 provider 的聊天组件用 Vercel AI SDK
  • 产品形态是”嵌入式 copilot”而不是独立聊天页 → CopilotKit
  • 以上都不贴合,再考虑自研

12. TypeScript 示例:带 Last-Event-ID 的 SSE 服务端

import type { IncomingMessage, ServerResponse } from "http"; import { redis } from "./redis"; type StreamEvent = { eventId: string; sequence: number; type: string; payload: Record<string, unknown>; }; function writeSseEvent(res: ServerResponse, event: StreamEvent) { res.write(`id: ${event.sequence}\n`); res.write(`event: ${event.type}\n`); res.write(`data: ${JSON.stringify(event)}\n\n`); } export async function handleAgentStream( req: IncomingMessage, res: ServerResponse, runId: string ) { res.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache, no-transform", Connection: "keep-alive", "X-Accel-Buffering": "no", // 禁用 nginx 缓冲 }); const lastEventId = req.headers["last-event-id"]; const startFrom = lastEventId ? `${lastEventId}` : "0"; const streamKey = `run:${runId}:events`; // 1. 回放历史 const historical = await redis.xrange(streamKey, `(${startFrom}`, "+"); for (const [id, fields] of historical) { const event = parseEvent(id, fields); writeSseEvent(res, event); } // 2. 接实时流 let cursor = historical.length ? historical[historical.length - 1][0] : startFrom; const abort = new AbortController(); req.on("close", () => abort.abort()); while (!abort.signal.aborted) { const newEvents = await redis.xread( "BLOCK", 30_000, "STREAMS", streamKey, cursor ); if (!newEvents) continue; for (const [, entries] of newEvents) { for (const [id, fields] of entries) { const event = parseEvent(id, fields); writeSseEvent(res, event); cursor = id; if (event.type === "run_completed" || event.type === "run_failed") { res.end(); return; } } } } } declare function parseEvent(id: string, fields: string[]): StreamEvent;

这段代码体现的工程要点:

  • 禁用代理缓冲(X-Accel-Buffering: no
  • 尊重 Last-Event-ID 做断点续传
  • Redis Stream 承载事件存储,先回放后接实时
  • 阻塞读(BLOCK 30000)避免轮询
  • 客户端断连时 abort 立即退出循环

13. 事件流 UI 的最佳实践

  1. 默认展示阶段,不展示原始事件洪流;高级面板可展开 tool trace、错误详情、重试记录
  2. 长文本流和步骤状态分开渲染——token 流 append-only,步骤状态用 state_delta 覆盖
  3. 终态必须明确区分成功、失败、取消、等待人工输入
  4. 刷新后优先恢复历史,再接实时流,避免闪烁
  5. Tool call 的参数和结果默认折叠,点开才展示完整 JSON
  6. Reasoning / CoT 事件默认隐藏或折叠,工程师可打开查看
  7. 审批卡片必须在事件流中显著标记(不是躺在底部等用户发现)
  8. 每个 tool call / step 关联 traceId,点击能跳转到 observability 工具

14. 本章方法论小结

  1. 事件流是 Agent 系统的核心基础设施,不是附加功能
  2. 事件流 UI 是执行过程的”用户化表达”,不是日志面板
  3. 事件 schema 必须同时服务前端、调试、监控和审计,含 runId/threadId/sequence/parentId/traceId
  4. 顺序、去重、幂等必须提前设计;outbox pattern 是强一致场景的标配
  5. SSE 是当前默认选择,双向需求才上 WebSocket;HTTP streaming 在 serverless 场景最友好
  6. Resumable stream 必须一等公民:SSE 的 Last-Event-ID + Redis Stream 事件存储是成熟方案
  7. AG-UI 定义了 Agent↔UI 的事件语义标准(lifecycle / message / tool / state / reasoning)
  8. Generative UI 让交互更丝滑但引入前后端耦合,按项目形态取舍
  9. 前端选型:后端 LangGraph 用 useStream,跨 provider 用 Vercel AI SDK,嵌入式 copilot 用 CopilotKit
Last updated on